kafka-go 实现SASL身份验证连接

kafka-go 实现SASL身份验证连接

代码来自kafka-go github issue 目前尚未调通!

通过允许用户在kafka.Dialer结构上设置SASLClient字段来添加对SASL身份验证的支持。

用户必须提供自己的kafka.SASLClient实现,因为目前没有适用于Go的SASL库,并且支持Kafka支持的所有实现,这将允许kafka-go在不更改核心库的情况下支持更多SASL机制。

测试已更新为针对实时服务器测试PLAIN身份验证。该实现还使用SCRAM-SHA-256和SCRAM-SHA-512进行了测试,针对0.11.0.3和2.0.1。

此提交引入了四个新的kafka调用,只有在设置了SASLClient 时才会使用它:

  • ApiVersionsRequest v1
  • SaslHandshakeRequest v0和v1
  • SaslAuthenticateRequestV0
  • 原始SASL数据包

有关身份验证序列的更多信息,请参阅https://kafka.apache.org/protocol#sasl_handshake

TODO:对于Kerberos和SCRAM-SHA-256-PLUS支持,可能需要扩展kafka.SASLClient 的接口方法。

使用github.com/xdg/scram实现SCRAM-SHA-512的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import (
"context"
"crypto/sha512"
"hash"
"log"

kafka "github.com/segmentio/kafka-go"
"github.com/xdg/scram"
)

var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }

type SCRAMClient struct {
client *scram.ClientConversation
}

func (s *SCRAMClient) Mechanism() string { return "SCRAM-SHA-512" }

func (s *SCRAMClient) Start(ctx context.Context) ([]byte, error) {
str, err := s.client.Step("")
return []byte(str), err
}

func (s *SCRAMClient) Next(ctx context.Context, challenge []byte) (bool, []byte, error) {
str, err := s.client.Step(string(challenge))
return s.client.Done(), []byte(str), err
}

func main() {
scramClient, err := SHA512.NewClient("adminscram", "admin-secret", "")
if err != nil {
log.Fatal(err)
}

r := kafka.NewReader(kafka.ReaderConfig{
Dialer: &kafka.Dialer{
SASLClient: func() kafka.SASLClient { return &SCRAMClient{scramClient.NewConversation()} },
},
Brokers: []string{"localhost:9094"},
Topic: "test-writer-1",
})